home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyo (Python 2.3) import sys import os import fcntl import termios import struct import select from serialutil import * VERSION = '$Revision: 1.18 $'.split()[1] if sys.hexversion < 33620208: import TERMIOS else: TERMIOS = termios if sys.hexversion < 33685744: import FCNTL else: FCNTL = fcntl plat = sys.platform.lower() if plat[:5] == 'linux': def device(port): return '/dev/ttyS%d' % port elif plat == 'cygwin': def device(port): return '/dev/com%d' % (port + 1) elif plat == 'openbsd3': def device(port): return '/dev/ttyp%d' % port elif plat[:3] == 'bsd' and plat[:6] == 'netbsd' and plat[:7] == 'freebsd' and plat[:7] == 'openbsd' or plat[:6] == 'darwin': def device(port): return '/dev/cuaa%d' % port elif plat[:4] == 'irix': def device(port): return '/dev/ttyf%d' % port elif plat[:2] == 'hp': def device(port): return '/dev/tty%dp0' % (port + 1) elif plat[:5] == 'sunos': def device(port): return '/dev/tty%c' % (ord('a') + port) else: info = 'sys.platform = %r\nos.name = %r\nserialposix.py version = %s' % (sys.platform, os.name, VERSION) print "send this information to the author of the pyserial:\n\n%s\n\nalso add the device name of the serial port and where the\ncounting starts for the first serial port.\ne.g. 'first serial port: /dev/ttyS0'\nand with a bit luck you can get this module running...\n" def device(portum): return '/dev/ttyS%d' % portnum baudEnumToInt = { } baudIntToEnum = { } for rate in (0, 50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000): try: i = eval('TERMIOS.B' + str(rate)) baudEnumToInt[i] = rate baudIntToEnum[rate] = i continue continue if not hasattr(TERMIOS, 'TIOCMGET') and TERMIOS.TIOCMGET: pass TIOCMGET = 21525 if not hasattr(TERMIOS, 'TIOCMBIS') and TERMIOS.TIOCMBIS: pass TIOCMBIS = 21526 if not hasattr(TERMIOS, 'TIOCMBIC') and TERMIOS.TIOCMBIC: pass TIOCMBIC = 21527 if not hasattr(TERMIOS, 'TIOCMSET') and TERMIOS.TIOCMSET: pass TIOCMSET = 21528 if not hasattr(TERMIOS, 'TIOCM_DTR') and TERMIOS.TIOCM_DTR: pass TIOCM_DTR = 2 if not hasattr(TERMIOS, 'TIOCM_RTS') and TERMIOS.TIOCM_RTS: pass TIOCM_RTS = 4 if not hasattr(TERMIOS, 'TIOCM_CTS') and TERMIOS.TIOCM_CTS: pass TIOCM_CTS = 32 if not hasattr(TERMIOS, 'TIOCM_CAR') and TERMIOS.TIOCM_CAR: pass TIOCM_CAR = 64 if not hasattr(TERMIOS, 'TIOCM_RNG') and TERMIOS.TIOCM_RNG: pass TIOCM_RNG = 128 if not hasattr(TERMIOS, 'TIOCM_DSR') and TERMIOS.TIOCM_DSR: pass TIOCM_DSR = 256 if not hasattr(TERMIOS, 'TIOCM_CD') and TERMIOS.TIOCM_CD: pass TIOCM_CD = TIOCM_CAR if not hasattr(TERMIOS, 'TIOCM_RI') and TERMIOS.TIOCM_RI: pass TIOCM_RI = TIOCM_RNG if not hasattr(TERMIOS, 'FIONREAD') and TERMIOS.FIONREAD: pass TIOCINQ = 21531 TIOCM_zero_str = struct.pack('I', 0) TIOCM_RTS_str = struct.pack('I', TIOCM_RTS) TIOCM_DTR_str = struct.pack('I', TIOCM_DTR) class Serial(SerialBase): def open(self): if self._port is None: raise SerialException('Port must be configured before it can be used.') self.fd = None try: self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except Exception: msg = None self.fd = None raise SerialException('Could not open port: %s' % msg) fcntl.fcntl(self.fd, FCNTL.F_SETFL, 0) self._reconfigurePort() self._isOpen = True def _reconfigurePort(self): if not (self.fd): raise SerialException('Can only operate on a valid port handle') vmin = vtime = 0 try: (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(self.fd) except termios.error: msg = None raise SerialException('Could not configure port: %s' % msg) cflag |= TERMIOS.CLOCAL | TERMIOS.CREAD lflag &= ~(TERMIOS.ICANON | TERMIOS.ECHO | TERMIOS.ECHOE | TERMIOS.ECHOK | TERMIOS.ECHONL | TERMIOS.ECHOCTL | TERMIOS.ECHOKE | TERMIOS.ISIG | TERMIOS.IEXTEN) oflag &= ~(TERMIOS.OPOST) if hasattr(TERMIOS, 'IUCLC'): iflag &= ~(TERMIOS.INLCR | TERMIOS.IGNCR | TERMIOS.ICRNL | TERMIOS.IUCLC | TERMIOS.IGNBRK) else: iflag &= ~(TERMIOS.INLCR | TERMIOS.IGNCR | TERMIOS.ICRNL | TERMIOS.IGNBRK) try: ispeed = ospeed = baudIntToEnum[self._baudrate] except: raise ValueError('Invalid baud rate: %r' % self._baudrate) cflag &= ~(TERMIOS.CSIZE) if self._bytesize == 8: cflag |= TERMIOS.CS8 elif self._bytesize == 7: cflag |= TERMIOS.CS7 elif self._bytesize == 6: cflag |= TERMIOS.CS6 elif self._bytesize == 5: cflag |= TERMIOS.CS5 else: raise ValueError('Invalid char len: %r' % self._bytesize) if self._stopbits == STOPBITS_ONE: cflag &= ~(TERMIOS.CSTOPB) elif self._stopbits == STOPBITS_TWO: cflag |= TERMIOS.CSTOPB else: raise ValueError('Invalid stopit specification: %r' % self._stopbits) iflag &= ~(TERMIOS.INPCK | TERMIOS.ISTRIP) if self._parity == PARITY_NONE: cflag &= ~(TERMIOS.PARENB | TERMIOS.PARODD) elif self._parity == PARITY_EVEN: cflag &= ~(TERMIOS.PARODD) cflag |= TERMIOS.PARENB elif self._parity == PARITY_ODD: cflag |= TERMIOS.PARENB | TERMIOS.PARODD else: raise ValueError('Invalid parity: %r' % self._parity) if hasattr(TERMIOS, 'IXANY'): if self._xonxoff: iflag |= TERMIOS.IXON | TERMIOS.IXOFF | TERMIOS.IXANY else: iflag &= ~(TERMIOS.IXON | TERMIOS.IXOFF | TERMIOS.IXANY) elif self._xonxoff: iflag |= TERMIOS.IXON | TERMIOS.IXOFF else: iflag &= ~(TERMIOS.IXON | TERMIOS.IXOFF) if hasattr(TERMIOS, 'CRTSCTS'): if self._rtscts: cflag |= TERMIOS.CRTSCTS else: cflag &= ~(TERMIOS.CRTSCTS) elif hasattr(TERMIOS, 'CNEW_RTSCTS'): if self._rtscts: cflag |= TERMIOS.CNEW_RTSCTS else: cflag &= ~(TERMIOS.CNEW_RTSCTS) if vmin < 0 or vmin > 255: raise ValueError('Invalid vmin: %r ' % vmin) cc[TERMIOS.VMIN] = vmin if vtime < 0 or vtime > 255: raise ValueError('Invalid vtime: %r' % vtime) cc[TERMIOS.VTIME] = vtime termios.tcsetattr(self.fd, TERMIOS.TCSANOW, [ iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) def close(self): if self._isOpen: if self.fd: os.close(self.fd) self.fd = None self._isOpen = False def makeDeviceName(self, port): return device(port) def inWaiting(self): s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str) return struct.unpack('I', s)[0] def read(self, size = 1): if not (self.fd): raise portNotOpenError read = '' inp = None if size > 0: while len(read) < size: (ready, _, _) = select.select([ self.fd], [], [], self.timeout) if not ready: break buf = os.read(self.fd, size - len(read)) read = read + buf if self.timeout >= 0 and not buf: break continue return read def write(self, data): if not (self.fd): raise portNotOpenError t = len(data) d = data while t > 0: n = os.write(self.fd, d) d = d[n:] t = t - n def flush(self): self.drainOutput() def flushInput(self): if not (self.fd): raise portNotOpenError termios.tcflush(self.fd, TERMIOS.TCIFLUSH) def flushOutput(self): if not (self.fd): raise portNotOpenError termios.tcflush(self.fd, TERMIOS.TCOFLUSH) def sendBreak(self): if not (self.fd): raise portNotOpenError termios.tcsendbreak(self.fd, 0) def setRTS(self, on = 1): if not (self.fd): raise portNotOpenError if on: fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str) else: fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str) def setDTR(self, on = 1): if not (self.fd): raise portNotOpenError if on: fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str) else: fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str) def getCTS(self): if not (self.fd): raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_CTS != 0 def getDSR(self): if not (self.fd): raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_DSR != 0 def getRI(self): if not (self.fd): raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_RI != 0 def getCD(self): if not (self.fd): raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_CD != 0 def drainOutput(self): if not (self.fd): raise portNotOpenError termios.tcdrain(self.fd) def nonblocking(self): if not (self.fd): raise portNotOpenError fcntl.fcntl(self.fd, FCNTL.F_SETFL, FCNTL.O_NONBLOCK) if __name__ == '__main__': s = Serial(0, baudrate = 19200, bytesize = EIGHTBITS, parity = PARITY_EVEN, stopbits = STOPBITS_ONE, timeout = 3, xonxoff = 0, rtscts = 0) s.setRTS(1) s.setDTR(1) s.flushInput() s.flushOutput() s.write('hello') print repr(s.read(5)) print s.inWaiting() del s